package com.example.starter.pubsub.kafka;

import cn.hutool.core.util.ObjUtil;
import com.example.starter.pubsub.api.PubCallback;
import com.example.starter.pubsub.api.PubResult;
import com.example.starter.pubsub.api.PubTemplate;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.ExecutionException;

/**
 * @author 王令
 * @since 2023/6/30 10:50
 */
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "com.example.pub-sub", name = "type", havingValue = "kafka")
public class KafkaPubTemplate implements PubTemplate {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Override
    public PubResult syncSend(String topic, Object message) throws ExecutionException, InterruptedException {
        final SendResult<String, Object> sendResult = kafkaTemplate.send(topic, message).get();
        return new KafkaPubResultAdapter<>(sendResult);
    }

    @Override
    public void asyncSend(String topic, Object message, PubCallback callback) {
        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                if (ObjUtil.isNotNull(callback)) {
                    callback.onFailure(ex);
                }
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                if (ObjUtil.isNotNull(callback)) {
                    callback.onSuccess(new KafkaPubResultAdapter<>(result));
                }
            }
        });
    }

    @Override
    public void asyncSend(String topic, Object message) {
        kafkaTemplate.send(topic, message);
    }
}
